Aggregations on Dataset[Double]

Getting spark up and running


In [1]:
classpath.add(
  "org.apache.spark" %% "spark-core" % "2.0.2",
  "org.apache.spark" %% "spark-sql" % "2.0.2",
  "org.apache.spark" %% "spark-mllib" % "2.0.2"
);


143 new artifact(s)
143 new artifacts in macro
143 new artifacts in runtime
143 new artifacts in compile


In [2]:
import org.apache.spark.sql.{SparkSession, DataFrame, Dataset}


import org.apache.spark.sql.{SparkSession, DataFrame, Dataset}

In [3]:
val spark = SparkSession.builder().master("local[*]").getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/08/02 19:31:09 INFO SparkContext: Running Spark version 2.0.2
17/08/02 19:31:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/08/02 19:31:09 INFO SecurityManager: Changing view acls to: amir.ziai
17/08/02 19:31:09 INFO SecurityManager: Changing modify acls to: amir.ziai
17/08/02 19:31:09 INFO SecurityManager: Changing view acls groups to: 
17/08/02 19:31:09 INFO SecurityManager: Changing modify acls groups to: 
17/08/02 19:31:09 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(amir.ziai); groups with view permissions: Set(); users  with modify permissions: Set(amir.ziai); groups with modify permissions: Set()
17/08/02 19:31:10 INFO Utils: Successfully started service 'sparkDriver' on port 53745.
17/08/02 19:31:10 INFO SparkEnv: Registering MapOutputTracker
17/08/02 19:31:10 INFO SparkEnv: Registering BlockManagerMaster
17/08/02 19:31:10 INFO DiskBlockManager: Created local directory at /private/var/folders/8n/xtn3n2c50tbgtcr2pnh21dl4002rn2/T/blockmgr-aeda5d2f-1877-43a3-9573-9a869e29ed79
17/08/02 19:31:10 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
17/08/02 19:31:10 INFO SparkEnv: Registering OutputCommitCoordinator
17/08/02 19:31:10 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/08/02 19:31:10 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.16.26.65:4040
17/08/02 19:31:10 INFO Executor: Starting executor ID driver on host localhost
17/08/02 19:31:10 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53746.
17/08/02 19:31:10 INFO NettyBlockTransferService: Server created on 172.16.26.65:53746
17/08/02 19:31:10 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.16.26.65, 53746)
17/08/02 19:31:10 INFO BlockManagerMasterEndpoint: Registering block manager 172.16.26.65:53746 with 2004.6 MB RAM, BlockManagerId(driver, 172.16.26.65, 53746)
17/08/02 19:31:10 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.16.26.65, 53746)
17/08/02 19:31:10 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
17/08/02 19:31:10 INFO SharedState: Warehouse path is 'file:/Users/amir.ziai/Dropbox/jupyter/spark-warehouse'.
spark: SparkSession = org.apache.spark.sql.SparkSession@171264db

In [4]:
import spark.implicits._


import spark.implicits._

Creating a Dataset[Double]


In [6]:
val data = spark.createDataset(Seq(1, 2, 3, 4, 5)).map(_.toDouble)


data: Dataset[Double] = [value: double]

Implicit aggregations exist on RDDs


In [7]:
data.rdd.mean()


res6: Double = 3.0

In [8]:
data.rdd.stdev()


res7: Double = 1.4142135623730951

But not on Dataset[Double]


In [8]:
data.mean()


Main.scala:66: value mean is not a member of org.apache.spark.sql.Dataset[Double]
data.mean()
     ^

Need to use sql.functions


In [14]:
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{mean, stddev, sum}


import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{mean, stddev, sum}

Here's one way to to this directly with Dataset


In [15]:
data.agg(mean(data("value"))).as[Double].collect().head


res14: Double = 3.0

Hideous, right? Let's make this a bit more generic


In [16]:
def applyFunctionToDatasetOfDouble(data: Dataset[Double], function: (Column => Column)) = {
    data.agg(function(data("value"))).as[Double].collect().head
}


defined function applyFunctionToDatasetOfDouble

In [17]:
applyFunctionToDatasetOfDouble(data, mean)


res16: Double = 3.0

Apparently stddev in sql.functions implements sample standard deviation, unlike the RDD case


In [19]:
applyFunctionToDatasetOfDouble(data, stddev)


res18: Double = 1.5811388300841898

Is this worth it? Is the conversion from Dataset to RDD expensive?